package org.firebug.spring.boot.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Test;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class SimpleExample {
    @Test
    public void produce() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("my_unique_group_name");
        producer.setNamesrvAddr("rocketmq.firebug.org:9876");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Message msg = new Message("TopicTest", "TagA", ("produce:" + i).getBytes());
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }

    @Test
    public void consume() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_unique_group_name");
        consumer.setNamesrvAddr("rocketmq.firebug.org:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
        Thread.sleep(10000L);
    }

    @Test
    public void asyncSendMessage() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("my_unique_group_name");
        producer.setNamesrvAddr("rocketmq.firebug.org:9876");
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 1; i++) {
            final int index = i;
            Message msg = new Message("TopicTest", "TagA", "OrderID188", "asyncSendMessage".getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                }

                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(5000L);
        producer.shutdown();
    }

    @Test
    public void sendMessageByOneWayMode() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("my_unique_group_name");
        producer.setNamesrvAddr("rocketmq.firebug.org:9876");
        producer.start();
        for (int i = 0; i < 1; i++) {
            Message msg = new Message("TopicTest", "TagA", ("sendMessageByOneWayMode " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        producer.shutdown();
    }


}